package com.parkingwang.learning.operators;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Action;
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.functions.Consumer;

import java.util.ArrayList;
import java.util.concurrent.*;

public class CreateOperator {

    public static void main(String[] args) {

//        createOper();

        ArrayList<Integer> list = new ArrayList<>();
        list.add(10);
        list.add(11);
        list.add(12);
        list.add(13);


//        justOper(list);

//        fromOper(list);

        repeatOper(list);


//        deferOper();


//        intervalOper();


//        timerOper();


    }


    /**
     * 延迟一段时间后发射数据0
     */
    private static void timerOper() {
        Observable.timer(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong + "-----hello");
                    }
                });

        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 按间隔发送数据
     * 相当于定时器
     * 如果没有初始延迟值，默认直接发送数据，没有延迟发送
     */
    private static void intervalOper() {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(10 - aLong);
                    }
                });
        try {
            Thread.sleep(12000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    /**
     * 直到有订阅者订阅才生成observable
     * <p>
     * 为每一个observer 生成单独的序列
     */
    private static void deferOper() {
        Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
            @Override
            public ObservableSource<String> call() throws Exception {
                return Observable.just("defer just");
            }
        });

        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

    /**
     * 重复发射原始数据
     *
     * @param list
     */
    private static void repeatOper(ArrayList<Integer> list) {
//        Observable.just(list)
//                .repeat(3)
//                .subscribe(new Consumer<ArrayList<Integer>>() {
//                    @Override
//                    public void accept(ArrayList<Integer> integers) throws Exception {
//                        for (int i = 0; i < integers.size(); i++) {
//                            System.out.println(i);
//                        }
//                        System.out.println("--------");
//                    }
//                });

        //repeatWhen 有条件的重新发射和订阅数据
//        Observable.just(1, 2, 3)
//                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
//                    @Override
//                    public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
//                        //5s后重新发送一回数据
//                        return Observable.timer(5, TimeUnit.SECONDS);
//                    }
//                }).subscribe(new Consumer<Integer>() {
//            @Override
//            public void accept(Integer integer) throws Exception {
//                System.out.println(integer);
//            }
//        });

        //repearUntil   getAsBoolean()返回true 将不再发射数据，否则一直重复发射数据
        long startTime = System.currentTimeMillis();

        Observable.just(1, 2, 3)
                .repeatUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {
                        return System.currentTimeMillis() - startTime > 3000;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });


        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    /**
     * from
     * <p>
     * 可以将Future,Iterable和数组转换为Observable
     *
     * @param list
     */
    private static void fromOper(ArrayList<Integer> list) {


//        Observable.fromIterable(list)
//                .subscribe(new Consumer<Integer>() {
//                    @Override
//                    public void accept(Integer integer) throws Exception {
//                        System.out.println(integer);
//                    }
//                });

        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());

        //from  还有两个可选参数，超时时长和时间单位，一旦超过超时时长，就会发射错误通知终止
        Observable.fromFuture(future, 2, TimeUnit.SECONDS)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println(throwable.toString());
                    }
                });
    }


    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {

            System.out.println("开始耗时操作...");

            Thread.sleep(3000);

            return "ok";
        }
    }


    /**
     * 原样输出
     *
     * @param list
     */
    private static void justOper(ArrayList<Integer> list) {
        Observable.just(list)
                .subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(ArrayList<Integer> integers) throws Exception {
                    }
                });
    }

    /**
     * create()  先检查isDisposed状态，在没有观察者的时候停止发射数据
     */
    private static void createOper() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                if (!emitter.isDisposed()) {
                    emitter.onNext(66);
                    emitter.onComplete();
                }
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {

            }
        }, new Action() {
            @Override
            public void run() throws Exception {

            }
        });
    }

}
